Email: hyfrankl@umich.edu
# !pip install mrjob
# !pip install pandas
# !pip install seaborn
# !pip install nltk
# !pip install datetime
# !pip install wordcloud
# nltk.download('stopwords')
# nltk.download('omw-1.4')
# nltk.download('wordnet')
# nltk.download('punkt')
# nltk.download('averaged_perceptron_tagger')
# restart the kernel
from authorContrib import AuthorContrib, preload, load_data
from pyspark.sql import SparkSession
import pandas as pd
import numpy as np
import matplotlib.pyplot as plt
import seaborn as sns
import nltk
from nltk.stem import WordNetLemmatizer
from datetime import datetime
import string
import re
ss = SparkSession.builder.appName('si618prj').getOrCreate()
%%time
# Use SparkSQL to extract data
data = preload(ss, input="data/metadata.csv")
CPU times: user 5.38 ms, sys: 2.89 ms, total: 8.28 ms Wall time: 45.3 s
!cp metadata_sub.csv/*.csv subdata.csv
Sub-Task 1. Analyze the number of published papers with MRJob
Use MRJob to get the basic information
mr_job = AuthorContrib(args=["subdata.csv", "-o", "output"])
mr_job.make_runner().run()
No configs specified for inline runner
!cat output/part* > task1.txt
Identify frequent unigrams and bigrams in abstract with PySpark
author_contribution = ss.read.csv('task1.txt', sep='\t', header=False) \
.withColumnRenamed('_c0', 'author') \
.withColumnRenamed('_c1', 'year') \
.withColumnRenamed('_c2', 'month') \
.withColumnRenamed('_c3', 'count')
author_contribution.createOrReplaceTempView('auth_paper')
The distribution of the number of published paper per person between 2002 and 2022
publish_distribution = ss.sql("""
select author, sum(count) as cnt from auth_paper
where year between 2002 and 2022 group by author""").toPandas()
publish_distribution['cnt'].hist()
plt.xlabel('Number of Published Paper')
plt.ylabel('Number of People')
plt.title('The Distribution of the Number of Published Paper per Person ')
plt.yscale('log')
Who frequently appears in the top 5 authors with the most published paper (at least 5) between 2002 and 2022
important_candidates = ss.sql("""
select author, count(1) as c from (
select year, author, cnt from (
select year, author, cnt, row_number() over(partition by year order by cnt desc, author) as nrow from (
select author, year, sum(count) as cnt from auth_paper where year between 2002 and 2022 group by author, year having cnt >= 5
)
) where nrow <= 5
) group by author having c > 2 order by c desc
""")
important_candidates.createOrReplaceTempView('df4')
Sub-Task 2: Frequent Unigrams/Bigrams in the Abstract Along Year 2019 - 2022
Select the monthly frequent unigrams/bigrams (at least 10 times)in the abstract between 2019 and 2022
metadata = ss.read.option('header', True).csv("data/metadata.csv")
metadata.createOrReplaceTempView('metadata')
abstract_data = ss.sql("""
select publish_time, abstract from metadata
where year(publish_time) between 2019 and 2022 and abstract is not null""")
st = ['patients', 'studies', 'study', 'cases', 'results', 'analysis', 'p',
'cases', 'years', 'months', 'days', 'd', 'data', 'time', 'number', 'measures', 'countries']
def remove_helper(w1, w2, t1, t2):
"""return unigrams or bigrams (noun)"""
if t1 and t2:
return [w1, w2]
elif t1:
return [w1]
elif t2:
return [w2]
def remove_stopwords(sentence, stop):
try:
from nltk.corpus import stopwords
st = stopwords.words('english')
lemmatizer = WordNetLemmatizer()
words = [
lemmatizer.lemmatize(w.lower())
for w in nltk.word_tokenize(sentence)
if (w.lower() not in set(stop)) and (w.lower() not in st) and not re.fullmatch('[' + string.punctuation + ']+', w)
]
marks = [t.startswith('NN') for w, t in nltk.pos_tag(words) if len(w) != 0]
bigrams = [remove_helper(w1, w2, t1, t2) for w1, w2, t1, t2 in zip(words[:-1], words[1:], marks[:-1], marks[1:]) if t1 or t2]
return bigrams
except:
return []
def publish_month(x):
try:
date = datetime.strptime(x ,"%Y-%m-%d")
return str(date.year) + "-" + str(date.month)
except:
return ""
q2 = abstract_data.rdd.filter(lambda x: len(publish_month(x[0])) != 0) \
.map(lambda x: (publish_month(x[0]), x[1])) \
.flatMapValues(lambda x: nltk.sent_tokenize(x))
q3 = q2.flatMapValues(lambda x: remove_stopwords(x, st))
q4 = q3.map(lambda x: (x[0] + "\t" + " ".join(x[1]), 1)) \
.reduceByKey(lambda x,y: x+y) \
.filter(lambda x: x[1] >= 10) \
.map(lambda x: x[0]+"\t"+str(x[1]))
q4.collect()
q4.saveAsTextFile('prj_task2')
!cat prj_task2/* >task2.txt
Word Cloud for 2019, 2020, 2021, 2022
word_freq = pd.read_csv("task2.txt", sep='\t', header=None)
from wordcloud import WordCloud
for year in ['2019','2020', '2021', '2022']:
keywords = word_freq[word_freq[0].str.startswith(year)][[1, 2]] \
.groupby(1).sum().sort_values(2, ascending=False).iloc[:100].to_dict()[2]
wordcloud = WordCloud()
wordcloud.generate_from_frequencies(frequencies=keywords)
plt.figure(figsize=[8, 4], dpi=300)
plt.imshow(wordcloud, interpolation="bilinear")
plt.title(f'Word Cloud for {year}')
plt.axis("off")
plt.show()
# from freqNgrams import load_data
confirmed_cases = load_data('data/confirmed_global.csv')
sns.set_theme()
Task 1: The Relationship with Published Researchers
def cnt_mapper(x):
year = x['year']
month = x['month']
author = x['author']
quarter = (int(month) + 2)//3
date = year + "Q" + str(quarter)
return (date, set([author]))
def cnt_reducer(x, y):
return x | y
people = author_contribution.rdd.filter(lambda x: x['month'] != None).map(lambda x: cnt_mapper(x))\
.reduceByKey(lambda x,y: cnt_reducer(x,y)).mapValues(len).collect()
people_df = pd.DataFrame(people, columns=['datetime', 'num'])
confirmed_with_people = confirmed_cases.merge(people_df, on='datetime', how='inner')
confirmed_with_people.drop(index=confirmed_with_people.index[confirmed_with_people['datetime'] > "2022Q2"], axis=0, inplace=True)
sns.set_style()
fig, axes = plt.subplots(2, 1, figsize=(12, 12), sharex=True)
sns.lineplot(data=confirmed_with_people, x='datetime', y='sum', ax=axes[0])
axes[0].set_ylabel('Number of Newly Confirmed Cases')
# axes[0].set_yscale('log')
sns.lineplot(data=confirmed_with_people, x='datetime', y='num', ax=axes[1])
axes[1].set_ylabel('Number of Published Researchers')
# axes[1].set_yscale('log')
Text(0, 0.5, 'Number of Published Researchers')
np.corrcoef(confirmed_with_people['sum'][:-2], confirmed_with_people['num'][2:])[0, 1]
0.09254640944316803
confirmed_with_people.to_csv('task1.csv')
Task 2. The Relationship with Published Paper
pop_author = ss.sql("""
select author, year, floor((month+2)/3) as quarter, sum(count) as cnt
from auth_paper where author in (select author from df4) group by author, year, quarter
""")
candidate_contribution = pd.pivot_table(data=pop_author.toPandas(),
index=['year', 'quarter'], columns='author', values='cnt', aggfunc=np.sum, fill_value=0).reset_index()
candidate_contribution['year'] = candidate_contribution['year'].astype(int)
df_join2 = confirmed_cases.merge(candidate_contribution, on=['year', 'quarter'], how='inner')
fig, axes = plt.subplots(2, 1, figsize=(12, 8), sharex=True)
axes[0].set_title("Newly Confirmed Cases vs. Time")
axes[0].bar(df_join2['datetime'], df_join2['sum'])
axes[0].set_ylabel('Number of Newly Confimed Cases')
axes[1].set_title("Newly Published Paper vs. Time")
for col in df_join2.columns[4:]:
axes[1].plot(df_join2['datetime'], df_join2[col])
axes[1].set_ylabel('Number of Newly Published Paper')
axes[1].legend(df_join2.columns[4:], bbox_to_anchor=(1.05, 1))
# plt.tight_layout()
fig.supxlabel('Time')
plt.show()
df_join2.to_csv('task2.csv')
Sub-Task 3. The Relationship with Ngrams
def quarter_freq(df, keys):
df['year'] = df[0].apply(lambda x: x.split('-')[0])
df['quarter'] = df[0].apply(lambda x: (int(x.split('-')[1]) + 2) // 3)
df1 = df.drop(columns=[0])
df2 = df1.drop(columns=[1]).groupby(['year', 'quarter'], as_index=False).sum().rename({2:'total'}, axis=1)
df3 = pd.pivot_table(
data=df1[df1[1].isin(keys)],
index=['year', 'quarter'],
columns=1, values=2,
aggfunc=np.sum).reset_index()
df2['datetime'] = df2['year'].apply(str) + "Q" + df2['quarter'].apply(str)
df4 = df2.merge(df3, on=['year', 'quarter'])
for key in keys:
df4[key + "_freq"] = df4[key] / df4['total']
df4.drop(columns=keys, inplace=True)
df4['year'] = df4['year'].astype(int)
return df4
def draw_freq(word_freq, keys, confirmed_cases, mark):
df7 = quarter_freq(word_freq, keys).merge(confirmed_cases, on=['year', 'quarter', 'datetime']).sort_values(['year', 'quarter'])
df7.drop(index=df7.index[df7['datetime'] > "2022Q3"], inplace=True)
fig, axes = plt.subplots(2, 1, figsize=(12, 16), sharex=True)
axes[0].set_title(f"{mark} Confirmed Cases vs. Time")
axes[0].bar(df7['datetime'], df7['sum'])
axes[0].set_ylabel(f"{mark} Confirmed Cases")
# for key, ax in zip(keys, axes[1:]):
ax = axes[1]
for key in keys:
ax.set_title(f"Ngrams Frequency vs. Time")
ax.plot(df7['datetime'], df7[key+'_freq'])
ax.set_ylabel(key)
ax.set_yscale('log')
ax.legend(keys, bbox_to_anchor=(1.05, 1))
# plt.tight_layout()
fig.supxlabel('Time')
return df7
keys = ['health', 'infection', 'disease', 'vaccine', 'respiratory', 'protein', 'rna', 'antibody', 'drug']
combined_key = draw_freq(word_freq, keys, confirmed_cases, "Newly")
plt.show()
combined_key
| year | quarter | total | datetime | health_freq | infection_freq | disease_freq | vaccine_freq | respiratory_freq | protein_freq | rna_freq | antibody_freq | drug_freq | sum | |
|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|
| 0 | 2020 | 2 | 1686772 | 2020Q2 | 0.007974 | 0.011325 | 0.013451 | 0.001655 | 0.004643 | 0.002528 | 0.001216 | 0.001720 | 0.003447 | 9606125.0 |
| 1 | 2020 | 3 | 2386642 | 2020Q3 | 0.007789 | 0.009260 | 0.011296 | 0.001632 | 0.003399 | 0.002785 | 0.001051 | 0.001866 | 0.003155 | 23559695.0 |
| 2 | 2020 | 4 | 2361162 | 2020Q4 | 0.008518 | 0.009332 | 0.010951 | 0.002104 | 0.003352 | 0.002533 | 0.000977 | 0.002090 | 0.003100 | 49745195.0 |
| 3 | 2021 | 1 | 2715100 | 2021Q1 | 0.008961 | 0.008706 | 0.009985 | 0.003068 | 0.002854 | 0.002585 | 0.000925 | 0.002265 | 0.002667 | 45573297.0 |
| 4 | 2021 | 2 | 3030952 | 2021Q2 | 0.008919 | 0.008570 | 0.009286 | 0.003799 | 0.002421 | 0.002639 | 0.000902 | 0.002405 | 0.002367 | 53362089.0 |
| 5 | 2021 | 3 | 2973443 | 2021Q3 | 0.008793 | 0.008476 | 0.008774 | 0.004558 | 0.002165 | 0.002449 | 0.000847 | 0.002568 | 0.002289 | 51634564.0 |
| 6 | 2021 | 4 | 3231277 | 2021Q4 | 0.009070 | 0.008293 | 0.008081 | 0.004781 | 0.001963 | 0.002244 | 0.000712 | 0.002629 | 0.002148 | 54423652.0 |
| 7 | 2022 | 1 | 3259854 | 2022Q1 | 0.008725 | 0.007804 | 0.007926 | 0.004713 | 0.001758 | 0.002362 | 0.000797 | 0.002509 | 0.002224 | 200071900.0 |
| 8 | 2022 | 2 | 1712846 | 2022Q2 | 0.009134 | 0.007761 | 0.007405 | 0.004840 | 0.001734 | 0.002449 | 0.000837 | 0.002508 | 0.002288 | 59000957.0 |
| 9 | 2022 | 3 | 2360 | 2022Q3 | 0.043644 | 0.016949 | 0.026695 | NaN | NaN | 0.012288 | 0.009322 | NaN | 0.005932 | 69980210.0 |
for key in keys:
print(key, "coeff:", np.corrcoef(combined_key['sum'][:-2], combined_key[key+'_freq'][2:])[0, 1])
health coeff: 0.9636081568728317 infection coeff: 0.9086832469293848 disease coeff: 0.8987619082608881 vaccine coeff: nan respiratory coeff: nan protein coeff: 0.9555158388586743 rna coeff: 0.9546490843698231 antibody coeff: nan drug coeff: 0.8669767811060866
data.createOrReplaceTempView('check')
ss.sql("""select doi, publish_time from check
where year(publish_time) = 2022 and floor((month(publish_time)+2)/3) >= 3 limit 5""").take(5)
[Row(doi='10.1016/j.jad.2022.05.060', publish_time='2022-08-15'), Row(doi='10.1098/rsta.2020.0411', publish_time='2022-07-11'), Row(doi='10.1212/nxi.0000000000001183', publish_time='2022-07-01'), Row(doi='10.24272/j.issn.2095-8137.2021.480', publish_time='2022-07-18'), Row(doi='10.1080/16549716.2022.2058170', publish_time='2022-12-31')]